ea5f7983c73c3de59bc2376ba809c714c4811fc7,debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java,BinlogReader,handleUpdate,#Event#,465

Before Change


                Serializable[] after = changes.getValue();
                count += recordMaker.update(before, after, ts, row, numRows);
            }
            logger.debug("Recorded {} update records for event: {}", count, event);
        } else {
            logger.debug("Skipping update row event: {}", event);
        }

After Change


     * @throws InterruptedException if this thread is interrupted while blocking
     */
    protected void handleUpdate(Event event) throws InterruptedException {
        if (skipEvent) {
            // We can skip this because we should already be at least this far ...
            logger.debug("Skipping previously processed row event: {}", event);
            return;
        }
        UpdateRowsEventData update = unwrapData(event);
        long tableNumber = update.getTableId();
        BitSet includedColumns = update.getIncludedColumns();
        // BitSet includedColumnsBefore = update.getIncludedColumnsBeforeUpdate();
        RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord);
        if (recordMaker != null) {
            List<Entry<Serializable[], Serializable[]>> rows = update.getRows();
            Long ts = context.clock().currentTimeInMillis();
            int count = 0;
            int numRows = rows.size();
            if (startingRowNumber < numRows) {
                for (int row = startingRowNumber; row != numRows; ++row) {
                    Map.Entry<Serializable[], Serializable[]> changes = rows.get(row);
                    Serializable[] before = changes.getKey();
                    Serializable[] after = changes.getValue();
                    count += recordMaker.update(before, after, ts, row, numRows);
                }
                if (logger.isDebugEnabled()) {
                    if (startingRowNumber != 0) {
                        logger.debug("Recorded {} update record(s) for last {} row(s) in event: {}",
                                     count, numRows - startingRowNumber, event);
                    } else {
                        logger.debug("Recorded {} update record(s) for event: {}", count, event);
                    }
                }
            } else {
                // All rows were previously processed ...
                logger.debug("Skipping previously processed update event: {}", event);
            }
        } else {
            logger.debug("Skipping update row event: {}", event);